Skip to content

feat(coprocessor): implement IAM authentication for coprocessor RDS#2254

Open
antoniupop wants to merge 3 commits intomainfrom
antoniu/iam
Open

feat(coprocessor): implement IAM authentication for coprocessor RDS#2254
antoniupop wants to merge 3 commits intomainfrom
antoniu/iam

Conversation

@antoniupop
Copy link
Copy Markdown
Collaborator

No description provided.

@cla-bot cla-bot Bot added the cla-signed label Apr 8, 2026
@antoniupop
Copy link
Copy Markdown
Collaborator Author

@deasydoesit We need to deploy to a staging environment with an RDS instance that has IAM auth enabled -- this is the only way to test the full end-to-end flow (token generation, TLS handshake, connection pooling, token refresh).

I've tested locally and looks like hte signed url generated is fine.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

Coprocessor Changed Lines Coverage

Coverage of added/modified lines in coprocessor: 58.0%

Per-file breakdown

Diff Coverage

Diff: origin/main...HEAD, staged and unstaged changes

  • coprocessor/fhevm-engine/fhevm-engine-common/src/bin/resolve_database_url.rs (0.0%): Missing lines 6-11,14-23
  • coprocessor/fhevm-engine/fhevm-engine-common/src/database.rs (58.5%): Missing lines 66-68,72,115-118,137,141-146,148,155-158,160,173,189-193,195-199,201,213,215-223,225,227-243,245-249,251-252,254-267,269-272,274,276-284,287-289,291,295-299,301-302,305,307-309,311,317,319-324,326-329,331-335,337-340,342-351,353-360,414-415
  • coprocessor/fhevm-engine/fhevm-engine-common/src/pg_pool.rs (75.0%): Missing lines 68-71
  • coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs (0.0%): Missing lines 173
  • coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs (46.2%): Missing lines 785-790,792
  • coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs (100%)
  • coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs (0.0%): Missing lines 18,21,23,61,66-68
  • coprocessor/fhevm-engine/tfhe-worker/src/health_check.rs (80.0%): Missing lines 61-63
  • coprocessor/fhevm-engine/tfhe-worker/src/lib.rs (100%)
  • coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs (100%)
  • coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs (0.0%): Missing lines 319-325
  • coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs (0.0%): Missing lines 92-96

Summary

  • Total: 482 lines
  • Missing: 202 lines
  • Coverage: 58%

coprocessor/fhevm-engine/fhevm-engine-common/src/bin/resolve_database_url.rs

   2     resolve_database_url_from_option, resolve_runtime_database_url,
   3 };
   4 
   5 #[tokio::main]
!  6 async fn main() {
!  7     let database_url = match resolve_database_url_from_option(None) {
!  8         Ok(database_url) => database_url,
!  9         Err(err) => {
! 10             eprintln!("failed to resolve DATABASE_URL: {err}");
! 11             std::process::exit(1);
  12         }
  13     };
! 14     match resolve_runtime_database_url(&database_url).await {
! 15         Ok(resolved_url) => {
! 16             println!("{resolved_url}");
! 17         }
! 18         Err(err) => {
! 19             eprintln!("failed to resolve DATABASE_URL: {err}");
! 20             std::process::exit(1);
! 21         }
! 22     }
! 23 }

coprocessor/fhevm-engine/fhevm-engine-common/src/database.rs

  62 #[derive(Default)]
  63 pub struct PoolRefreshHandle(Option<CancellationToken>);
  64 
  65 impl PoolRefreshHandle {
! 66     fn new(cancel_token: CancellationToken) -> Self {
! 67         Self(Some(cancel_token))
! 68     }
  69 
  70     pub fn cancel(&mut self) {
  71         if let Some(token) = self.0.take() {
! 72             token.cancel();
  73         }
  74     }
  75 }

  111         DatabaseAuth::Static => database_url
  112             .parse()
  113             .map(connect_options_transform)
  114             .map_err(|err| sqlx::Error::Configuration(Box::new(err)))?,
! 115         DatabaseAuth::AwsIam(config) => config
! 116             .connect_options(connect_options_transform)
! 117             .await
! 118             .map_err(|err| sqlx::Error::Configuration(Box::new(err)))?,
  119     };
  120 
  121     let pool = pool_options.connect_with(connect_options).await?;
  122     let refresh_handle =

  133         DatabaseAuth::Static => database_url
  134             .parse()
  135             .map(connect_options_transform)
  136             .map_err(Into::into),
! 137         DatabaseAuth::AwsIam(config) => config.connect_options(connect_options_transform).await,
  138     }
  139 }
  140 
! 141 pub async fn resolve_runtime_database_url(
! 142     database_url: &DatabaseURL,
! 143 ) -> Result<String, DatabaseConnectionError> {
! 144     match DatabaseAuth::from_env(database_url) {
! 145         DatabaseAuth::Static => Ok(database_url.as_str().to_owned()),
! 146         DatabaseAuth::AwsIam(config) => config.render_database_url().await,
  147     }
! 148 }
  149 
  150 pub fn resolve_database_url_from_option(
  151     database_url: Option<DatabaseURL>,
  152 ) -> Result<DatabaseURL, DatabaseConnectionError> {

  151     database_url: Option<DatabaseURL>,
  152 ) -> Result<DatabaseURL, DatabaseConnectionError> {
  153     match database_url {
  154         Some(database_url) => Ok(database_url),
! 155         None => match std::env::var(DATABASE_URL_ENV) {
! 156             Ok(database_url) => DatabaseURL::from_str(&database_url).map_err(Into::into),
! 157             Err(_) if env_flag_enabled(DATABASE_IAM_AUTH_ENABLED) => {
! 158                 Err(DatabaseConnectionError::MissingDatabaseUrl)
  159             }
! 160             Err(_) => Ok(DatabaseURL::default()),
  161         },
  162     }
  163 }

  169 
  170 impl DatabaseAuth {
  171     fn from_env(database_url: &DatabaseURL) -> Self {
  172         if env_flag_enabled(DATABASE_IAM_AUTH_ENABLED) {
! 173             Self::AwsIam(AwsIamConfig::from_env(database_url.clone()))
  174         } else {
  175             Self::Static
  176         }
  177     }

  185         let Self::AwsIam(config) = self else {
  186             return PoolRefreshHandle::default();
  187         };
  188 
! 189         let cancel_token = refresh_parent
! 190             .map(CancellationToken::child_token)
! 191             .unwrap_or_default();
! 192         let task_cancel_token = cancel_token.clone();
! 193         let config = config.clone();
  194 
! 195         tokio::spawn(async move {
! 196             config
! 197                 .refresh_pool_connect_options(pool, task_cancel_token, connect_options_transform)
! 198                 .await;
! 199         });
  200 
! 201         PoolRefreshHandle::new(cancel_token)
  202     }
  203 }
  204 
  205 #[derive(Clone)]

  209     ssl_root_cert_path: Option<String>,
  210 }
  211 
  212 impl AwsIamConfig {
! 213     fn from_env(database_url: DatabaseURL) -> Self {
  214         Self {
! 215             database_url,
! 216             region_override: std::env::var(DATABASE_IAM_REGION)
! 217                 .ok()
! 218                 .map(|value| value.trim().to_owned())
! 219                 .filter(|value| !value.is_empty()),
! 220             ssl_root_cert_path: std::env::var(DATABASE_SSL_ROOT_CERT_PATH)
! 221                 .ok()
! 222                 .map(|value| value.trim().to_owned())
! 223                 .filter(|value| !value.is_empty()),
  224         }
! 225     }
  226 
! 227     async fn connect_options(
! 228         &self,
! 229         connect_options_transform: fn(PgConnectOptions) -> PgConnectOptions,
! 230     ) -> Result<PgConnectOptions, DatabaseConnectionError> {
! 231         let ssl_root_cert_path = self
! 232             .ssl_root_cert_path
! 233             .as_deref()
! 234             .ok_or(DatabaseConnectionError::MissingSslRootCertPath)?;
! 235         let context = self.build_auth_context().await?;
! 236         let auth_token = generate_rds_iam_token(
! 237             &context.host,
! 238             context.port,
! 239             &context.username,
! 240             &context.region,
! 241             context.credentials,
! 242             SystemTime::now(),
! 243         )?;
  244 
! 245         let mut options = self.database_url.parse()?;
! 246         options = options
! 247             .password(&auth_token)
! 248             .ssl_mode(PgSslMode::VerifyFull)
! 249             .ssl_root_cert(ssl_root_cert_path);
  250 
! 251         Ok(connect_options_transform(options))
! 252     }
  253 
! 254     async fn render_database_url(&self) -> Result<String, DatabaseConnectionError> {
! 255         let ssl_root_cert_path = self
! 256             .ssl_root_cert_path
! 257             .as_deref()
! 258             .ok_or(DatabaseConnectionError::MissingSslRootCertPath)?;
! 259         let context = self.build_auth_context().await?;
! 260         let auth_token = generate_rds_iam_token(
! 261             &context.host,
! 262             context.port,
! 263             &context.username,
! 264             &context.region,
! 265             context.credentials,
! 266             SystemTime::now(),
! 267         )?;
  268 
! 269         render_database_url_with_auth_token(
! 270             self.database_url.as_str(),
! 271             &auth_token,
! 272             Some(ssl_root_cert_path),
  273         )
! 274     }
  275 
! 276     async fn refresh_pool_connect_options(
! 277         &self,
! 278         pool: Pool<Postgres>,
! 279         cancel_token: CancellationToken,
! 280         connect_options_transform: fn(PgConnectOptions) -> PgConnectOptions,
! 281     ) {
! 282         let refresh_interval = IAM_TOKEN_TTL
! 283             .checked_sub(IAM_REFRESH_MARGIN)
! 284             .unwrap_or(IAM_TOKEN_TTL);
  285 
  286         loop {
! 287             tokio::select! {
! 288                 _ = cancel_token.cancelled() => {
! 289                     return;
  290                 }
! 291                 _ = tokio::time::sleep(refresh_interval) => {}
  292             }
  293 
  294             loop {
! 295                 match self.connect_options(connect_options_transform).await {
! 296                     Ok(connect_options) => {
! 297                         pool.set_connect_options(connect_options);
! 298                         info!(database_url = %self.database_url, "Refreshed PostgreSQL IAM auth token");
! 299                         break;
  300                     }
! 301                     Err(err) => {
! 302                         warn!(
  303                             error = %err,
  304                             database_url = %self.database_url,
! 305                             "Failed to refresh PostgreSQL IAM auth token; retrying"
  306                         );
! 307                         tokio::select! {
! 308                             _ = cancel_token.cancelled() => {
! 309                                 return;
  310                             }
! 311                             _ = tokio::time::sleep(IAM_REFRESH_RETRY_DELAY) => {}
  312                         }
  313                     }
  314                 }
  315             }

  313                     }
  314                 }
  315             }
  316         }
! 317     }
  318 
! 319     async fn build_auth_context(&self) -> Result<AwsAuthContext, DatabaseConnectionError> {
! 320         let connect_options = self.database_url.parse()?;
! 321         let host = connect_options.get_host().to_owned();
! 322         if host.is_empty() {
! 323             return Err(DatabaseConnectionError::MissingHost);
! 324         }
  325 
! 326         let username = connect_options.get_username().to_owned();
! 327         if username.is_empty() {
! 328             return Err(DatabaseConnectionError::MissingUsername);
! 329         }
  330 
! 331         let mut loader = aws_config::defaults(BehaviorVersion::latest());
! 332         if let Some(region) = &self.region_override {
! 333             loader = loader.region(Region::new(region.clone()));
! 334         }
! 335         let shared_config = loader.load().await;
  336 
! 337         let region = shared_config
! 338             .region()
! 339             .map(|region| region.as_ref().to_owned())
! 340             .ok_or(DatabaseConnectionError::MissingAwsRegion)?;
  341 
! 342         let credentials = shared_config
! 343             .credentials_provider()
! 344             .ok_or_else(|| {
! 345                 DatabaseConnectionError::AwsCredentials(
! 346                     "no AWS credentials provider is configured".to_owned(),
! 347                 )
! 348             })?
! 349             .provide_credentials()
! 350             .await
! 351             .map_err(|err| DatabaseConnectionError::AwsCredentials(err.to_string()))?;
  352 
! 353         Ok(AwsAuthContext {
! 354             host,
! 355             port: connect_options.get_port(),
! 356             username,
! 357             region,
! 358             credentials,
! 359         })
! 360     }
  361 }
  362 
  363 struct AwsAuthContext {
  364     host: String,

  410 ) -> Result<String, DatabaseConnectionError> {
  411     let mut url = Url::parse(database_url)
  412         .map_err(|err| DatabaseConnectionError::UrlRendering(err.to_string()))?;
  413     url.set_password(Some(auth_token)).map_err(|_| {
! 414         DatabaseConnectionError::UrlRendering("database URL cannot accept a password".to_owned())
! 415     })?;
  416     apply_iam_ssl_settings_to_url(&mut url, ssl_root_cert_path);
  417     Ok(url.to_string())
  418 }

coprocessor/fhevm-engine/fhevm-engine-common/src/pg_pool.rs

  64             {
  65                 Ok((p, refresh_handle)) => {
  66                     break (p, Arc::new(refresh_handle));
  67                 }
! 68                 Err(err) => {
! 69                     error!( error=%err, "Failed to create initial DB pool; retrying...");
! 70                     sleep(retry_db_conn_interval).await;
! 71                     continue;
  72                 }
  73                 }
  74         };

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

  169 
  170     let cancel_token = CancellationToken::new();
  171 
  172     let config = ConfigSettings {
! 173         database_url: resolve_database_url_from_option(conf.database_url.clone())?,
  174         database_pool_size: conf.database_pool_size,
  175         verify_proof_req_db_channel: conf.verify_proof_req_database_channel,
  176         gw_url: conf.gw_url,
  177         error_sleep_initial_secs: conf.error_sleep_initial_secs,

coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs

  781         let mut blockchain_connected = false;
  782         let mut error_details = Vec::new();
  783 
  784         // Check database connection
! 785         let db_pool_result = match connect_options_for_database_url(&self.conf.database_url).await {
! 786             Ok(connect_options) => {
! 787                 PgPoolOptions::new()
! 788                     .max_connections(self.conf.database_pool_size)
! 789                     .connect_with(connect_options)
! 790                     .await
  791             }
! 792             Err(err) => Err(sqlx::Error::Configuration(Box::new(err))),
  793         };
  794 
  795         match db_pool_result {
  796             Ok(pool) => {

coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs

  14         token.cancel();
  15     });
  16 }
  17 
! 18 fn construct_config() -> Result<Config, fhevm_engine_common::database::DatabaseConnectionError> {
  19     let args: utils::daemon_cli::Args = utils::daemon_cli::parse_args();
  20 
! 21     let db_url = resolve_database_url_from_option(args.database_url.clone())?;
  22 
! 23     Ok(Config {
  24         service_name: args.service_name,
  25         metrics: SNSMetricsConfig {
  26             addr: args.metrics_addr,
  27             gauge_update_interval_secs: args.gauge_update_interval_secs,

  57         },
  58         enable_compression: args.enable_compression,
  59         schedule_policy: args.schedule_policy,
  60         pg_auto_explain_with_min_duration: args.pg_auto_explain_with_min_duration,
! 61     })
  62 }
  63 
  64 #[tokio::main]
  65 async fn main() {
! 66     let config: Config = construct_config().unwrap_or_else(|err| {
! 67         error!(error = %err, "Invalid database configuration");
! 68         std::process::exit(1);
  69     });
  70     let parent = CancellationToken::new();
  71 
  72     let _otel_guard = telemetry::init_tracing_otel_with_logs_only_fallback(

coprocessor/fhevm-engine/tfhe-worker/src/health_check.rs

  57                     } else {
  58                         status.set_custom_check("database", false, true);
  59                     }
  60                 }
! 61                 Err(_) => {
! 62                     status.set_custom_check("database", false, true);
! 63                 }
  64             }
  65         };
  66         status
  67     }

coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs

  315         gas_limit_overprovision_percent: conf.gas_limit_overprovision_percent,
  316         graceful_shutdown_timeout: conf.graceful_shutdown_timeout,
  317     };
  318 
! 319     let database_url = resolve_database_url_from_option(conf.database_url)?;
! 320     let (db_pool, _pool_refresh_handle) = connect_pool_with_options(
! 321         &database_url,
! 322         sqlx::postgres::PgPoolOptions::new().max_connections(conf.database_pool_size),
! 323         Some(&cancel_token),
! 324     )
! 325     .await?;
  326 
  327     let transaction_sender = std::sync::Arc::new(
  328         TransactionSender::new(
  329             db_pool.clone(),

coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs

   88         &args.service_name,
   89         "otlp-layer",
   90     );
   91 
!  92     let database_url = match resolve_database_url_from_option(args.database_url.clone()) {
!  93         Ok(database_url) => database_url,
!  94         Err(err) => {
!  95             error!(error = %err, "Invalid database configuration");
!  96             std::process::exit(1);
   97         }
   98     };
   99 
  100     let conf = zkproof_worker::Config {

@zama-ai zama-ai deleted a comment from claude Bot Apr 8, 2026
@antoniupop antoniupop marked this pull request as ready for review April 8, 2026 11:06
@antoniupop antoniupop requested review from a team as code owners April 8, 2026 11:06
Comment thread charts/coprocessor/templates/_helpers.tpl
resolve_database_url_cmd() {
if [[ -x /usr/local/bin/resolve_database_url ]]; then
/usr/local/bin/resolve_database_url
elif [[ -x "${workspace_dir}/target/release/resolve_database_url" ]]; then
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider also checking target/debug/resolve_database_url, since that is the default output path for a normal cargo build, otherwise we may miss an already built local binary and fall back to recompiling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants